Add deterministic code and snippet memory identity#181
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements deterministic judging for code annotations and snippets, centralizing parsing and metadata generation logic within src/schemas/code.py. The JudgeAgent is updated to handle these new domains by performing metadata lookups to decide between adding, updating, or skipping items. Feedback suggests optimizing the _deterministic_code and _deterministic_snippet methods by deduplicating incoming items and using asyncio.gather for parallel metadata lookups to improve performance and prevent redundant operations.
| async def _deterministic_code( | ||
| self, new_items: list, user_id: str, | ||
| ) -> JudgeResult: | ||
| operations: list[Operation] = [] | ||
| for item in new_items: | ||
| content = str(item) | ||
| fields = code_annotation_fields_from_storage_content(content) | ||
| match = await self._lookup_metadata_match({ | ||
| "user_id": user_id, | ||
| "domain": JudgeDomain.CODE.value, | ||
| "annotation_key": code_annotation_identity_key(fields), | ||
| }) | ||
|
|
||
| if match is None: | ||
| operations.append(Operation( | ||
| type=OperationType.ADD, | ||
| content=content, | ||
| reason="No code annotation with the same repo/target/type key.", | ||
| )) | ||
| continue | ||
|
|
||
| incoming_hash = code_annotation_content_hash(fields) | ||
| existing_hash = str((match.metadata or {}).get("annotation_hash", "")) | ||
| if incoming_hash == existing_hash: | ||
| operations.append(Operation( | ||
| type=OperationType.NOOP, | ||
| content=content, | ||
| embedding_id=match.id, | ||
| reason="Existing code annotation is unchanged.", | ||
| )) | ||
| else: | ||
| operations.append(Operation( | ||
| type=OperationType.UPDATE, | ||
| content=content, | ||
| embedding_id=match.id, | ||
| reason="Existing code annotation target has updated content.", | ||
| )) | ||
|
|
||
| return JudgeResult(operations=operations, confidence=1.0) |
There was a problem hiding this comment.
The _deterministic_code method processes items sequentially and lacks deduplication of the incoming new_items. If multiple identical annotations are extracted in a single turn, this will result in redundant operations and potential duplicate records in the vector store. It is recommended to deduplicate items by their identity key and use asyncio.gather to perform metadata lookups in parallel, maintaining consistency with the profile and temporal domains.
async def _deterministic_code(
self, new_items: list, user_id: str,
) -> JudgeResult:
# Deduplicate items by identity key to prevent redundant operations
unique_items: dict[str, tuple[str, dict]] = {}
for item in new_items:
content = str(item)
fields = code_annotation_fields_from_storage_content(content)
key = code_annotation_identity_key(fields)
unique_items[key] = (content, fields)
async def _process_one(content: str, fields: dict) -> Operation:
match = await self._lookup_metadata_match({
"user_id": user_id,
"domain": JudgeDomain.CODE.value,
"annotation_key": code_annotation_identity_key(fields),
})
if match is None:
return Operation(
type=OperationType.ADD,
content=content,
reason="No code annotation with the same repo/target/type key.",
)
incoming_hash = code_annotation_content_hash(fields)
existing_hash = str((match.metadata or {}).get("annotation_hash", ""))
if incoming_hash == existing_hash:
return Operation(
type=OperationType.NOOP,
content=content,
embedding_id=match.id,
reason="Existing code annotation is unchanged.",
)
else:
return Operation(
type=OperationType.UPDATE,
content=content,
embedding_id=match.id,
reason="Existing code annotation target has updated content.",
)
tasks = [_process_one(c, f) for c, f in unique_items.values()]
operations = await asyncio.gather(*tasks)
return JudgeResult(operations=list(operations), confidence=1.0)| async def _deterministic_snippet( | ||
| self, new_items: list, user_id: str, | ||
| ) -> JudgeResult: | ||
| operations: list[Operation] = [] | ||
| for item in new_items: | ||
| content = str(item) | ||
| fields = snippet_fields_from_storage_content(content) | ||
| match = await self._lookup_metadata_match({ | ||
| "user_id": user_id, | ||
| "domain": JudgeDomain.SNIPPET.value, | ||
| "snippet_hash": snippet_identity_hash(fields), | ||
| }) | ||
|
|
||
| if match is None: | ||
| operations.append(Operation( | ||
| type=OperationType.ADD, | ||
| content=content, | ||
| reason="No snippet with the same normalized code/content identity.", | ||
| )) | ||
| else: | ||
| operations.append(Operation( | ||
| type=OperationType.NOOP, | ||
| content=content, | ||
| embedding_id=match.id, | ||
| reason="Same snippet was already stored for this user.", | ||
| )) | ||
|
|
||
| return JudgeResult(operations=operations, confidence=1.0) |
There was a problem hiding this comment.
Similar to _deterministic_code, the _deterministic_snippet method should deduplicate new_items by their identity hash and parallelize the metadata lookups using asyncio.gather to improve performance and prevent duplicate operations.
async def _deterministic_snippet(
self, new_items: list, user_id: str,
) -> JudgeResult:
# Deduplicate items by snippet hash to prevent redundant operations
unique_items: dict[str, tuple[str, dict]] = {}
for item in new_items:
content = str(item)
fields = snippet_fields_from_storage_content(content)
h = snippet_identity_hash(fields)
unique_items[h] = (content, fields)
async def _process_one(content: str, fields: dict) -> Operation:
match = await self._lookup_metadata_match({
"user_id": user_id,
"domain": JudgeDomain.SNIPPET.value,
"snippet_hash": snippet_identity_hash(fields),
})
if match is None:
return Operation(
type=OperationType.ADD,
content=content,
reason="No snippet with the same normalized code/content identity.",
)
else:
return Operation(
type=OperationType.NOOP,
content=content,
embedding_id=match.id,
reason="Same snippet was already stored for this user.",
)
tasks = [_process_one(c, f) for c, f in unique_items.values()]
operations = await asyncio.gather(*tasks)
return JudgeResult(operations=list(operations), confidence=1.0)|
Pushed a follow-up in |
There was a problem hiding this comment.
@hunterbastian thanks for the update. The overall direction looks good, especially moving code/snippet judging to deterministic metadata lookups and addressing the dedupe/parallel lookup feedback.
I’d like to hold off on merging until two identity issues are fixed:
-
code_annotation_identity_key()should include bothtarget_fileandtarget_symbolwhen present. Right now it usestarget_symbolovertarget_file, so the same symbol name in different files within a repo can collide. -
snippet_identity_hash()should preserve code identity more strictly. Sincestable_hash()lowercases and collapses whitespace, two different code snippets can be treated as the same snippet. For code snippets, we should hash the normalized code text without lowercasing/collapsing internal whitespace.
Please add regression tests for both cases and rerun the full CI once the workflow is approved.
|
Pushed follow-up in |
|
@hunterbastian Please have a discussion on the issue #141 so that we can discuss the approach first and then you can implement in the PR. |
|
| Filename | Overview |
|---|---|
| src/schemas/code.py | Adds deterministic identity helpers (parse, hash, metadata builders) for code annotations and snippets; code_annotation_content_hash uses the case-insensitive stable_hash, which can silently suppress updates when only annotation body casing changes. |
| src/agents/judge.py | Routes CODE and SNIPPET domains through new deterministic paths; _lookup_metadata_match correctly uses asyncio.to_thread matching the existing profile pattern, but has no fallback when search_by_metadata is unavailable. |
| src/pipelines/ingest.py | Replaces LLM judge calls with ephemeral deterministic JudgeAgent instances for code and snippet domains; correctly binds snippet_vector_store before both the judge and the weaver now. |
| src/pipelines/weaver.py | Replaces inline metadata dicts and local parser functions with schema helpers; embedding text for snippets is now richer (description + language + tags) instead of bare description. |
| tests/test_deterministic_memory_layer.py | Adds four integration tests covering cross-session snippet dedup, batch dedup, metadata persistence, and code annotation update detection; FakeVectorStore correctly models metadata equality search. |
| tests/unit/test_schemas.py | Adds unit tests for all new schema helpers; verifies hash stability, identity key format, and metadata field values. |
Sequence Diagram
sequenceDiagram
participant Ingest as IngestPipeline
participant Judge as JudgeAgent (ephemeral)
participant Schema as schemas/code.py
participant Store as VectorStore (Pinecone)
participant Weaver as Weaver
Ingest->>Judge: "arun_deterministic({domain: CODE/SNIPPET, new_items, user_id})"
Judge->>Schema: code_annotation_fields_from_storage_content(content)
Schema-->>Judge: fields dict
Judge->>Schema: code_annotation_identity_key(fields) / snippet_identity_hash(fields)
Schema-->>Judge: identity key / hash
Judge->>Store: "search_by_metadata({user_id, domain, annotation_key/snippet_hash})"
Store-->>Judge: SearchResult or None
alt No match found
Judge-->>Ingest: Operation(ADD)
else Match found, hash unchanged
Judge-->>Ingest: Operation(NOOP)
else Match found, hash changed (code only)
Judge-->>Ingest: Operation(UPDATE)
end
Ingest->>Weaver: execute(judge_result, domain, user_id)
Weaver->>Schema: code_annotation_pinecone_metadata / snippet_pinecone_metadata
Schema-->>Weaver: metadata dict (with annotation_key/annotation_hash or snippet_hash)
Weaver->>Store: add / update with enriched metadata
Reviews (1): Last reviewed commit: "Fix deterministic code identity collisio..." | Re-trigger Greptile
| def code_annotation_content_hash(fields: dict[str, Any]) -> str: | ||
| return stable_hash( | ||
| code_annotation_identity_key(fields), | ||
| fields.get("severity"), | ||
| fields.get("content"), | ||
| ) |
There was a problem hiding this comment.
code_annotation_content_hash calls stable_hash, which runs every part through normalize_lookup_text (lowercase + collapse-whitespace). A case-only change to an annotation body — e.g. correcting Auth.login → auth.login in the free-text content, or capitalizing a variable name — produces the same hash as the original, so the deterministic judge returns NOOP and silently discards the update. Code identifiers in annotation text can be case-significant.
| def code_annotation_content_hash(fields: dict[str, Any]) -> str: | |
| return stable_hash( | |
| code_annotation_identity_key(fields), | |
| fields.get("severity"), | |
| fields.get("content"), | |
| ) | |
| def code_annotation_content_hash(fields: dict[str, Any]) -> str: | |
| return strict_hash( | |
| code_annotation_identity_key(fields), | |
| fields.get("severity"), | |
| fields.get("content"), | |
| ) |
| async def _lookup_metadata_match( | ||
| self, filters: Dict[str, Any], | ||
| ) -> Optional[SearchResult]: | ||
| if not self.vector_store: | ||
| return None | ||
| search_fn = getattr(self.vector_store, "search_by_metadata", None) | ||
| if search_fn is None: | ||
| return None | ||
| results = await asyncio.to_thread(search_fn, filters=filters, top_k=1) | ||
| return _first_match(results or []) |
There was a problem hiding this comment.
No fallback when
search_by_metadata is absent
_lookup_metadata_match silently returns None when the injected vector store lacks a search_by_metadata method. Every incoming CODE or SNIPPET item then resolves to OperationType.ADD, so the same snippet or code annotation will be re-inserted on every session rather than deduped. The existing _fetch_similar_profile_metadata falls back to a semantic search in this case; applying the same fallback (or at least a warning) here would keep the two deterministic paths consistent and avoid silent duplicate growth.
Summary
Implements deterministic identity metadata for code annotations and personal snippets so XMem can avoid re-judging exact code/snippet memories with an LLM.
Changes:
snippet_hash,annotation_key, andannotation_hashin Pinecone metadataThis addresses the edge case discussed in #141 where a user sends a snippet, then asks for the same snippet in another session. The normalized
snippet_hashlets the judge no-op the duplicate without another model call.Verification
python3 -m compileall src/schemas/code.py src/agents/judge.py src/pipelines/ingest.py src/pipelines/weaver.py tests/unit/test_schemas.py tests/test_deterministic_memory_layer.pyuv run --extra dev pytest tests/unit/test_schemas.py tests/test_deterministic_memory_layer.py-> 12 passeduv run --extra dev pytest-> 44 passeduv run ruff check --select F401 src/schemas/code.py src/agents/judge.py src/pipelines/ingest.py src/pipelines/weaver.py tests/unit/test_schemas.py tests/test_deterministic_memory_layer.pygit diff --check/claim #141